use crate::data_frame::single_data::SingleData;
use crate::data_frame::{
    value::{ConfValue, RealValue},
};
use crate::run_plan::HandResult;
use crate::indicators::{indicators_key_ref, IndicatorsMode, Op};
use crate::lazy_static::__Deref;
use crate::physical_plan::PhysicalPlanExprArg;
use chrono::{DateTime, Datelike, FixedOffset, NaiveDateTime, Timelike, Utc};
use rand::prelude::*;
use regex::Regex;
use std::cmp::Ordering;
use std::net::{IpAddr, Ipv4Addr};
use crate::physical_plan::window::SortByValue;

macro_rules! compare {
    // macth like arm for macro
    ($args:expr,$data:expr,$cmp:ident,$_cmp:ident,$cmpname:expr) => {
        // macro expand to this code
        {
            if $args.len() != 2 {
                indicators_key_ref(
                    Op::Add,
                    IndicatorsMode::PhysicalPlan,
                    $cmpname,
                    "arg len err",
                    1,
                );
                return HandResult::Err(());
            }

            let left = &$args[0];
            let right = &$args[1];

            match left.deref() {
                PhysicalPlanExprArg::ConfValue(left) => match right.deref() {
                    PhysicalPlanExprArg::ConfValue(_) => {
                        indicators_key_ref(
                            Op::Add,
                            IndicatorsMode::PhysicalPlan,
                            $cmpname,
                            "left and right both ConfValue err",
                            1,
                        );
                        return HandResult::Err(());
                    }
                    PhysicalPlanExprArg::RealValue(right) => {
                        return left.$cmp(right);
                    }
                    PhysicalPlanExprArg::ElementValue(right) => {
                        let right = $data.data.get_key_value(right)?;
                        return left.$cmp(&right);
                    },
                },
                PhysicalPlanExprArg::RealValue(left) => match right.deref() {
                    PhysicalPlanExprArg::ConfValue(right) => {
                        return ConfValue::$_cmp(left, right);
                    }
                    PhysicalPlanExprArg::RealValue(right) => {
                        return left.$cmp(right);
                    }
                    PhysicalPlanExprArg::ElementValue(right) => {
                        let right = $data.data.get_key_value(right)?;
                        return left.$cmp(&right);
                    },
                },
                PhysicalPlanExprArg::ElementValue(left) => {
                    let left = $data.data.get_key_value(left)?;
                    match right.deref() {
                        PhysicalPlanExprArg::ConfValue(right) => {
                            return ConfValue::$_cmp(&left, right);
                        }
                        PhysicalPlanExprArg::RealValue(right) => {
                            return left.$cmp(right);
                        }
                        PhysicalPlanExprArg::ElementValue(right) => {
                            let right = $data.data.get_key_value(right)?;
                            return left.$cmp(&right);
                        }
                    }
                },
            }
        }
    };
}

pub fn gt(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    compare!(args, data, gt, _gt, "gt")
}

pub fn ge(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    compare!(args, data, ge, _ge, "ge")
}

pub fn lt(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    compare!(args, data, lt, _lt, "lt")
}

pub fn le(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    compare!(args, data, le, _le, "le")
}

pub fn eq(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    compare!(args, data, eq, _eq, "eq")
}

pub fn exist(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    if args.len() > 16 {
        indicators_key_ref(
            Op::Add,
            IndicatorsMode::PhysicalPlan,
            "exist",
            "args too many(> 16) err ",
            1,
        );
        return HandResult::Err(());
    }

    for elm in args {
        match &elm.deref() {
            PhysicalPlanExprArg::ElementValue(elm) => {
                    let _v = &data.data.get_key_value(elm)?;
            },
            PhysicalPlanExprArg::ConfValue(_left) => {
                indicators_key_ref(
                    Op::Add,
                    IndicatorsMode::PhysicalPlan,
                    "exist",
                    "elements are ConfValue err",
                    1,
                );
                return HandResult::Err(());
            }
            PhysicalPlanExprArg::RealValue(_left) => {
                indicators_key_ref(
                    Op::Add,
                    IndicatorsMode::PhysicalPlan,
                    "exist",
                    "elements are RealValue err",
                    1,
                );
                return HandResult::Err(());
            }
        }
    }
    return HandResult::Ok(RealValue::Boolean(true));
}

pub fn regex(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    if args.len() != 2 {
        indicators_key_ref(
            Op::Add,
            IndicatorsMode::PhysicalPlan,
            "regex",
            "arg len err",
            1,
        );
        return HandResult::Err(());
    }

    let elm = &args[0];
    let regex = &args[1];
    let elm_value: String;

    match &elm.deref() {
        PhysicalPlanExprArg::ElementValue(left) => {
            if let Ok(a) = &data.data.get_key_value(left) {
                match a {
                    RealValue::Utf8(a) => elm_value = a.to_string(),
                    _ => {
                        indicators_key_ref(
                            Op::Add,
                            IndicatorsMode::PhysicalPlan,
                            "regex",
                            "left is illegal err",
                            1,
                        );
                        return HandResult::Err(());
                    }
                }
            } else {
                return HandResult::Ok(RealValue::Boolean(false));
            }
        },
        PhysicalPlanExprArg::ConfValue(_left) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "regex",
                "elements are ConfValue err",
                1,
            );
            return HandResult::Err(());
        }
        PhysicalPlanExprArg::RealValue(_left) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "regex",
                "elements are RealValue err",
                1,
            );
            return HandResult::Err(());
        }
    }

    let tmp_regex: &str;
    match regex.deref() {
        PhysicalPlanExprArg::ConfValue(right) => {
            tmp_regex = &right.value;
        }
        PhysicalPlanExprArg::RealValue(right) => match right {
            RealValue::Utf8(a) => {
                tmp_regex = a;
            }
            _ => {
                indicators_key_ref(
                    Op::Add,
                    IndicatorsMode::PhysicalPlan,
                    "regex",
                    "right is RealValue err",
                    1,
                );
                return HandResult::Err(());
            }
        },
        PhysicalPlanExprArg::ElementValue(_) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "regex",
                "right is ElementValue err",
                1,
            );
            return HandResult::Err(());
        }
    }

    let regex_str = format!(r"^{}", tmp_regex);
    if let Ok(a) = Regex::new(&regex_str) {
        let re = a;
        return HandResult::Ok(RealValue::Boolean(re.is_match(&elm_value)));
    } else {
        return HandResult::Ok(RealValue::Boolean(false));
    }
}

pub fn is_bytes(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    if args.len() != 2 {
        indicators_key_ref(
            Op::Add,
            IndicatorsMode::PhysicalPlan,
            "is_bytes",
            "arg len err",
            1,
        );
        return HandResult::Err(());
    }

    let elm = &args[0];
    let regex = &args[1];
    let elm_value: Vec<u8>;
    let tmp_data: &mut SingleData;

    //find elements & its value
    match &elm.deref() {
        PhysicalPlanExprArg::ElementValue(left) => {
            tmp_data = data;
            match &tmp_data.data.get_key_value(left) {
                Ok(a) => match a {
                    RealValue::Bytes(a) => elm_value = a.clone(),
                    RealValue::Utf8(a) => elm_value = a.clone().into_bytes(),
                    _ => {
                        indicators_key_ref(
                            Op::Add,
                            IndicatorsMode::PhysicalPlan,
                            "is_bytes",
                            "left is illegal err",
                            1,
                        );
                        return HandResult::Err(());
                    }
                },
                Err(()) => {
                    return HandResult::Ok(RealValue::Boolean(false));
                }
            }
        },
        PhysicalPlanExprArg::ConfValue(_left) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "is_bytes",
                "elements are ConfValue err",
                1,
            );
            return HandResult::Err(());
        }
        PhysicalPlanExprArg::RealValue(_left) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "is_bytes",
                "elements are RealValue err",
                1,
            );
            return HandResult::Err(());
        }
    }

    let decoded: Vec<u8>;
    let key: String;
    match regex.deref() {
        PhysicalPlanExprArg::ConfValue(right) => {
            key = right.value.clone();
            let a = &right.value;
            if a[..2].to_string() == "0x" {
                decoded = hex::decode(a[2..].to_string()).expect("Decoding failed");
            } else {
                decoded = a.clone().into_bytes();
            }
        }
        PhysicalPlanExprArg::RealValue(right) => match right {
            RealValue::Utf8(a) => {
                key = a.clone();
                if a[..2].to_string() == "0x" {
                    decoded = hex::decode(a[2..].to_string()).expect("Decoding failed");
                } else {
                    decoded = a.clone().into_bytes();
                }
            }
            RealValue::Bytes(a) => {
                if let Ok(a) = String::from_utf8(a.clone()) {
                    key = a;
                } else {
                    return HandResult::Err(());
                }
                decoded = a.clone();
            }
            _ => {
                indicators_key_ref(
                    Op::Add,
                    IndicatorsMode::PhysicalPlan,
                    "is_bytes",
                    "right is RealValue err",
                    1,
                );
                return HandResult::Err(());
            }
        },
        PhysicalPlanExprArg::ElementValue(_) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "is_bytes",
                "right is ElementValue err",
                1,
            );
            return HandResult::Err(());
        }
    }

    let mut count = 0;
    let key_len = decoded.len();
    for i in 0..(elm_value.len() - key_len + 1) {
        if elm_value[i..i + key_len] == decoded {
            count += 1;
            tmp_data
                .metadata
                .high_light_insert(key.clone(), (i, key_len));
        }
    }

    if count > 0 {
        return HandResult::Ok(RealValue::Boolean(true));
    }
    return HandResult::Ok(RealValue::Boolean(false));
}

pub fn like(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    if args.len() != 2 {
        indicators_key_ref(
            Op::Add,
            IndicatorsMode::PhysicalPlan,
            "like",
            "arg len err",
            1,
        );
        return HandResult::Err(());
    }

    let elm = &args[0];
    let regex = &args[1];
    let elm_value: String;
    let tmp_data: &mut SingleData;

    match &elm.deref() {
        PhysicalPlanExprArg::ElementValue(left) => {
            tmp_data = data;
            if let Ok(a) = &tmp_data.data.get_key_value(left) {
                if let RealValue::Utf8(s) = a {
                    elm_value = s.to_string()
                } else {
                    indicators_key_ref(
                        Op::Add,
                        IndicatorsMode::PhysicalPlan,
                        "like",
                        "left is illegal err",
                        1,
                    );
                    return HandResult::Err(());
                }
            } else {
                return HandResult::Ok(RealValue::Boolean(false));
            }
        },
        PhysicalPlanExprArg::ConfValue(_left) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "like",
                "elements are ConfValue err",
                1,
            );
            return HandResult::Err(());
        }
        PhysicalPlanExprArg::RealValue(_left) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "like",
                "elements are RealValue err",
                1,
            );
            return HandResult::Err(());
        }
    }

    let tmp_cmp_str: &str;
    match regex.deref() {
        PhysicalPlanExprArg::ConfValue(right) => {
            tmp_cmp_str = &right.value;
        }
        PhysicalPlanExprArg::RealValue(right) => match right {
            RealValue::Utf8(a) => {
                tmp_cmp_str = a;
            }
            _ => {
                indicators_key_ref(
                    Op::Add,
                    IndicatorsMode::PhysicalPlan,
                    "like",
                    "right is RealValue err",
                    1,
                );
                return HandResult::Err(());
            }
        },
        PhysicalPlanExprArg::ElementValue(_) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "like",
                "right is ElementValue err",
                1,
            );
            return HandResult::Err(());
        }
    }

    let bytes = tmp_cmp_str.as_bytes();
    let elm_bytes = elm_value.as_bytes();
    let mut count = 0;

    if bytes[0] == "%".as_bytes()[0] && bytes[bytes.len()] == "%".as_bytes()[0] {
        let key_len = bytes.len() - 2;
        let key;
        if let Ok(a) = String::from_utf8(bytes[1..bytes.len() - 1].to_vec()) {
            key = a;
        } else {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "like",
                "String to vec err",
                1,
            );
            return HandResult::Err(());
        }
        for i in 0..(elm_bytes.len() - key_len + 1) {
            if elm_bytes[i..i + key_len] == bytes[1..bytes.len() - 1] {
                count += 1;
                tmp_data
                    .metadata
                    .high_light_insert(key.clone(), (i, key_len));
            }
        }
    } else if bytes[0] == "%".as_bytes()[0] {
        if elm_bytes.len() >= bytes.len() - 1 {
            if bytes[1..bytes.len()]
                == elm_bytes[elm_bytes.len() - bytes.len() + 1..elm_bytes.len()]
            {
                let key;
                if let Ok(a) = String::from_utf8(bytes[1..bytes.len()].to_vec()) {
                    key = a;
                } else {
                    indicators_key_ref(
                        Op::Add,
                        IndicatorsMode::PhysicalPlan,
                        "like",
                        "String to vec err",
                        1,
                    );
                    return HandResult::Err(());
                }
                tmp_data
                    .metadata
                    .high_light_insert(key, (elm_bytes.len() - bytes.len() + 1, bytes.len() - 1));
                count += 1;
            }
        }
    } else if bytes[bytes.len() - 1] == "%".as_bytes()[0] {
        if elm_bytes.len() >= bytes.len() - 1 {
            if bytes[1..bytes.len()] == elm_bytes[0..bytes.len() - 1] {
                let key;
                if let Ok(a) = String::from_utf8(bytes[1..bytes.len()].to_vec()) {
                    key = a;
                } else {
                    indicators_key_ref(
                        Op::Add,
                        IndicatorsMode::PhysicalPlan,
                        "like",
                        "String to vec err",
                        1,
                    );
                    return HandResult::Err(());
                }

                tmp_data
                    .metadata
                    .high_light_insert(key, (0, bytes.len() - 1));
                count += 1;
            }
        }
    }

    if count > 0 {
        return HandResult::Ok(RealValue::Boolean(true));
    }
    return HandResult::Ok(RealValue::Boolean(false));
}

pub fn is_range(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    if args.len() != 3 {
        indicators_key_ref(
            Op::Add,
            IndicatorsMode::PhysicalPlan,
            "is_range",
            "arg len err",
            1,
        );
        return HandResult::Err(());
    }

    let elm = &args[0];
    let low = &args[1];
    let high = &args[2];
    let elm_value: String;
    let tmp_data: &mut SingleData;

    match &elm.deref() {
        PhysicalPlanExprArg::ElementValue(left) => {
            tmp_data = data;
            if let Ok(a) = &tmp_data.data.get_key_value(left) {
                if let RealValue::Utf8(s) = a {
                    elm_value = s.to_string()
                } else {
                    indicators_key_ref(
                        Op::Add,
                        IndicatorsMode::PhysicalPlan,
                        "is_range",
                        "left is illegal err",
                        1,
                    );
                    return HandResult::Err(());
                }
            } else {
                return HandResult::Ok(RealValue::Boolean(false));
            }
        },
        PhysicalPlanExprArg::ConfValue(_left) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "is_range",
                "elements are ConfValue err",
                1,
            );
            return HandResult::Err(());
        }
        PhysicalPlanExprArg::RealValue(_left) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "is_range",
                "elements are RealValue err",
                1,
            );
            return HandResult::Err(());
        }
    }

    let tmp_low: &str;
    match low.deref() {
        PhysicalPlanExprArg::ConfValue(right) => {
            tmp_low = &right.value;
        }
        PhysicalPlanExprArg::RealValue(right) => match right {
            RealValue::Utf8(a) => {
                tmp_low = a;
            }
            _ => {
                indicators_key_ref(
                    Op::Add,
                    IndicatorsMode::PhysicalPlan,
                    "is_range",
                    "right is RealValue err",
                    1,
                );
                return HandResult::Ok(RealValue::None);
            }
        },
        PhysicalPlanExprArg::ElementValue(_) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "is_range",
                "right is ElementValue err",
                1,
            );
            return HandResult::Err(());
        }
    }

    let tmp_high: &str;
    match high.deref() {
        PhysicalPlanExprArg::ConfValue(right) => {
            tmp_high = &right.value;
        }
        PhysicalPlanExprArg::RealValue(right) => match right {
            RealValue::Utf8(a) => {
                tmp_high = a;
            }
            _ => {
                indicators_key_ref(
                    Op::Add,
                    IndicatorsMode::PhysicalPlan,
                    "is_range",
                    "right is RealValue err",
                    1,
                );
                return HandResult::Err(());
            }
        },
        PhysicalPlanExprArg::ElementValue(_) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "is_range",
                "right is ElementValue err",
                1,
            );
            return HandResult::Err(());
        }
    }

    let elm_bytes = elm_value.as_bytes();
    let bytes = tmp_low.as_bytes();
    let mut count = 0;
    let key_len = tmp_low.len();
    let high_key_len = tmp_high.len();
    let mut low_index = 0;
    let mut high_index = 0;
    for i in 0..(elm_bytes.len() - key_len + 1) {
        if elm_bytes[i..i + key_len] == bytes[0..] {
            count += 1;
            low_index = i;
            break;
        }
    }

    if count > 0 {
        let bytes = tmp_high.as_bytes();
        let key_len = tmp_high.len();

        for i in (low_index + key_len + 1)..(elm_bytes.len() - high_key_len + 1) {
            if elm_bytes[i..i + high_key_len] == bytes[0..] {
                count += 1;
                high_index = i;
                break;
            }
        }

        if count == 2 {
            tmp_data
                .metadata
                .high_light_insert(tmp_low.to_string().clone(), (low_index, key_len));
            tmp_data
                .metadata
                .high_light_insert(tmp_high.to_string().clone(), (high_index, high_key_len));
            return HandResult::Ok(RealValue::Boolean(true));
        }
    } else {
        return HandResult::Ok(RealValue::Boolean(false));
    }

    return HandResult::Ok(RealValue::Boolean(false));
}

pub fn is_prefix(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    if args.len() != 2 {
        indicators_key_ref(
            Op::Add,
            IndicatorsMode::PhysicalPlan,
            "is_prefix",
            "arg len err",
            1,
        );
        return HandResult::Err(());
    }

    let elm = &args[0];
    let prefix = &args[1];
    let elm_value: IpAddr;

    match &elm.deref() {
        PhysicalPlanExprArg::ElementValue(left) => {
            if let Ok(a) = &data.data.get_key_value(left) {
                match a {
                    RealValue::Utf8(a) => {
                        if let Ok(v) = a.parse() {
                            elm_value = v
                        } else {
                            return HandResult::Ok(RealValue::Boolean(false));
                        }
                    }
                    _ => {
                        return HandResult::Ok(RealValue::Boolean(false));
                    }
                }
            } else {
                return HandResult::Ok(RealValue::Boolean(false));
            }
        },
        PhysicalPlanExprArg::ConfValue(_left) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "is_prefix",
                "elements are ConfValue err",
                1,
            );
            return HandResult::Err(());
        }
        PhysicalPlanExprArg::RealValue(_left) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "is_prefix",
                "elements are RealValue err",
                1,
            );
            return HandResult::Err(());
        }
    }

    let tmp_prefix: &str;
    match prefix.deref() {
        PhysicalPlanExprArg::ConfValue(right) => {
            tmp_prefix = &right.value;
        }
        PhysicalPlanExprArg::RealValue(right) => match right {
            RealValue::Utf8(a) => {
                tmp_prefix = a;
            }
            _ => {
                indicators_key_ref(
                    Op::Add,
                    IndicatorsMode::PhysicalPlan,
                    "is_prefix",
                    "right is RealValue err",
                    1,
                );
                return HandResult::Err(());
            }
        },
        PhysicalPlanExprArg::ElementValue(_) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "is_prefix",
                "right is ElementValue err",
                1,
            );
            return HandResult::Err(());
        }
    }

    let pos: Vec<&str> = tmp_prefix.split("/").collect();
    if pos.len() != 2 {
        indicators_key_ref(
            Op::Add,
            IndicatorsMode::PhysicalPlan,
            "is_prefix",
            "right is illegal prefix",
            1,
        );
        return HandResult::Err(());
    }

    let prefixlen: usize;
    let prefix;

    if let Ok(a) = pos[1].parse() {
        prefixlen = a;
    } else {
        indicators_key_ref(
            Op::Add,
            IndicatorsMode::PhysicalPlan,
            "is_prefix",
            "Parse error",
            1,
        );
        return HandResult::Err(());
    }

    if let Ok(a) = pos[0].parse() {
        prefix = a;
    } else {
        indicators_key_ref(
            Op::Add,
            IndicatorsMode::PhysicalPlan,
            "is_prefix",
            "Parse error",
            1,
        );
        return HandResult::Err(());
    }

    match elm_value {
        IpAddr::V4(v) => match prefix {
            IpAddr::V4(p) => {
                if prefixlen > 32 {
                    indicators_key_ref(
                        Op::Add,
                        IndicatorsMode::PhysicalPlan,
                        "is_prefix",
                        "prefixlen is illegal",
                        1,
                    );
                    return HandResult::Err(());
                }
                let mask: u32 = (!0) << (32 - prefixlen);
                let p: u32 = p.into();
                let v: u32 = v.into();
                if v & mask == p & mask {
                    return HandResult::Ok(RealValue::Boolean(true));
                }
            }
            IpAddr::V6(_) => {
                if prefixlen > 128 {
                    indicators_key_ref(
                        Op::Add,
                        IndicatorsMode::PhysicalPlan,
                        "is_prefix",
                        "prefixlen is illegal",
                        1,
                    );
                    return HandResult::Err(());
                }
                return HandResult::Ok(RealValue::Boolean(false));
            }
        },
        IpAddr::V6(v) => match prefix {
            IpAddr::V6(p) => {
                if prefixlen > 128 {
                    indicators_key_ref(
                        Op::Add,
                        IndicatorsMode::PhysicalPlan,
                        "is_prefix",
                        "prefixlen is illegal",
                        1,
                    );
                    return HandResult::Err(());
                }

                let mut n = prefixlen / 8;
                let a = prefixlen % 8;
                let p = p.octets();
                let v = v.octets();

                if a > 0 {
                    n = n + 1;
                }
                for i in 0..n {
                    if i < n {
                        if p[i] & 0xFF != v[i] & 0xFF {
                            return HandResult::Ok(RealValue::Boolean(false));
                        }
                    } else {
                        let m: u8 = (!0) << (8 - a);
                        if p[i] & m != v[i] & m {
                            return HandResult::Ok(RealValue::Boolean(false));
                        }
                    }
                }

                return HandResult::Ok(RealValue::Boolean(true));
            }
            IpAddr::V4(_) => {
                if prefixlen > 32 {
                    indicators_key_ref(
                        Op::Add,
                        IndicatorsMode::PhysicalPlan,
                        "is_prefix",
                        "prefixlen is illegal",
                        1,
                    );
                    return HandResult::Err(());
                }
                return HandResult::Ok(RealValue::Boolean(false));
            }
        },
    }

    return HandResult::Ok(RealValue::Boolean(false));
}

macro_rules! ip_check {
    // macth like arm for macro
    ($args:expr,$data:expr,$check:ident,$checkname:expr,$ipflag:ident) => {
        // macro expand to this code
        {
            if $args.len() > 16 {
                indicators_key_ref(
                    Op::Add,
                    IndicatorsMode::PhysicalPlan,
                    $checkname,
                    "args too many(> 16) err ",
                    1,
                );
                return HandResult::Err(());
            }

            for elm in $args {
                match &elm.deref() {
                    PhysicalPlanExprArg::ElementValue(elm) => {
                        if let Ok(a) = $data.data.get_key_value(elm) {
                            match a {
                                RealValue::Utf8(a) => {
                                    let elm_value: $ipflag;
                                    if let Ok(a) = a.parse() {
                                        elm_value = a;
                                    } else {
                                        indicators_key_ref(
                                            Op::Add,
                                            IndicatorsMode::PhysicalPlan,
                                            $checkname,
                                            "parse err",
                                            1,
                                        );
                                        return HandResult::Err(());
                                    }
                                    if elm_value.$check() != true {
                                        return HandResult::Ok(RealValue::Boolean(false));
                                    }
                                }
                                _ => {
                                    indicators_key_ref(
                                        Op::Add,
                                        IndicatorsMode::PhysicalPlan,
                                        $checkname,
                                        "left is illegal err",
                                        1,
                                    );
                                    return HandResult::Err(());
                                }
                            }
                        } else {
                            return HandResult::Ok(RealValue::None);
                        }
                    },
                    PhysicalPlanExprArg::ConfValue(_left) => {
                        indicators_key_ref(
                            Op::Add,
                            IndicatorsMode::PhysicalPlan,
                            $checkname,
                            "elements are ConfValue err",
                            1,
                        );
                        return HandResult::Err(());
                    }
                    PhysicalPlanExprArg::RealValue(_left) => {
                        indicators_key_ref(
                            Op::Add,
                            IndicatorsMode::PhysicalPlan,
                            $checkname,
                            "elements are RealValue err",
                            1,
                        );
                        return HandResult::Err(());
                    }
                }
            }
            return HandResult::Ok(RealValue::Boolean(true));
        }
    };
}

macro_rules! ip_check_class {
    // macth like arm for macro
    ($args:expr,$data:expr,$checkname:expr,$ipclass:expr) => {
        // macro expand to this code
        {
            if $args.len() > 16 {
                indicators_key_ref(
                    Op::Add,
                    IndicatorsMode::PhysicalPlan,
                    $checkname,
                    "args too many(> 16) err ",
                    1,
                );
                return HandResult::Err(());
            }

            for elm in $args {
                match &elm.deref() {
                    PhysicalPlanExprArg::ElementValue(elm) => {
                        if let Ok(a) = $data.data.get_key_value(elm) {
                            match a {
                                RealValue::Utf8(a) => {
                                    let elm_value: Ipv4Addr;
                                    if let Ok(e) = a.parse() {
                                        elm_value = e;
                                    } else {
                                        indicators_key_ref(
                                            Op::Add,
                                            IndicatorsMode::PhysicalPlan,
                                            $checkname,
                                            "Ipaddr parse err",
                                            1,
                                        );
                                        return HandResult::Err(());
                                    }
                                    let oct = elm_value.octets();

                                    if $ipclass == 1 {
                                        if oct[0] < 1 || oct[0] > 126 {
                                            return HandResult::Ok(RealValue::Boolean(false));
                                        }
                                    } else if $ipclass == 2 {
                                        if oct[0] < 128 || oct[0] > 191 {
                                            return HandResult::Ok(RealValue::Boolean(false));
                                        }
                                    } else if $ipclass == 3 {
                                        if oct[0] < 192 || oct[0] > 223 {
                                            return HandResult::Ok(RealValue::Boolean(false));
                                        }
                                    } else {
                                        indicators_key_ref(
                                            Op::Add,
                                            IndicatorsMode::PhysicalPlan,
                                            $checkname,
                                            "left is illegal err",
                                            1,
                                        );
                                        return HandResult::Err(());
                                    }
                                }
                                _ => {
                                    indicators_key_ref(
                                        Op::Add,
                                        IndicatorsMode::PhysicalPlan,
                                        $checkname,
                                        "left is illegal err",
                                        1,
                                    );
                                    return HandResult::Err(());
                                }
                            }
                        } else {
                            return HandResult::Ok(RealValue::Boolean(false));
                        }
                    },
                    PhysicalPlanExprArg::ConfValue(_left) => {
                        indicators_key_ref(
                            Op::Add,
                            IndicatorsMode::PhysicalPlan,
                            $checkname,
                            "elements are ConfValue err",
                            1,
                        );
                        return HandResult::Err(());
                    }
                    PhysicalPlanExprArg::RealValue(_left) => {
                        indicators_key_ref(
                            Op::Add,
                            IndicatorsMode::PhysicalPlan,
                            $checkname,
                            "elements are RealValue err",
                            1,
                        );
                        return HandResult::Err(());
                    }
                }
            }
            return HandResult::Ok(RealValue::Boolean(true));
        }
    };
}

pub fn is_loopback(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    ip_check!(args, data, is_loopback, "is_loopback", IpAddr)
}

pub fn is_ipv6(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    ip_check!(args, data, is_ipv6, "is_ipv6", IpAddr)
}

pub fn is_ipv4(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    ip_check!(args, data, is_ipv4, "is_ipv4", IpAddr)
}

pub fn is_unspecified(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    ip_check!(args, data, is_unspecified, "is_unspecified", IpAddr)
}

pub fn is_global(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    ip_check!(args, data, is_global, "is_global", IpAddr)
}

pub fn is_multicast(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    ip_check!(args, data, is_multicast, "is_muticast", IpAddr)
}

pub fn is_documentation(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    ip_check!(args, data, is_documentation, "is_documentation", IpAddr)
}

pub fn is_benchmarking(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    ip_check!(args, data, is_benchmarking, "is_benchmarking", IpAddr)
}

pub fn is_ipv4_broadcast(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    ip_check!(args, data, is_broadcast, "is_broadcast", Ipv4Addr)
}

pub fn is_ipv4_private(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    ip_check!(args, data, is_private, "is_private", Ipv4Addr)
}

pub fn is_ipv4_shared(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    ip_check!(args, data, is_shared, "is_shared", Ipv4Addr)
}

pub fn is_ipv4_reserved(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    ip_check!(args, data, is_reserved, "is_reserved", Ipv4Addr)
}

pub fn is_ipv4_a_class(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    ip_check_class!(args, data, "is_ipv4_a_class", 1)
}

pub fn is_ipv4_b_class(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    ip_check_class!(args, data, "is_ipv4_b_class", 2)
}

pub fn is_ipv4_c_class(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    ip_check_class!(args, data, "is_ipv4_c_class", 3)
}

pub fn is_linklocal(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    if args.len() > 16 {
        indicators_key_ref(
            Op::Add,
            IndicatorsMode::PhysicalPlan,
            "is_linklocal",
            "args too many(> 16) err ",
            1,
        );
        return HandResult::Err(());
    }

    for elm in args {
        match &elm.deref() {
            PhysicalPlanExprArg::ElementValue(elm) => {
                if let Ok(a) = &data.data.get_key_value(elm) {
                    match a {
                        RealValue::Utf8(a) => {
                            let elm_value: IpAddr;
                            if let Ok(a) = a.parse() {
                                elm_value = a;
                            } else {
                                indicators_key_ref(
                                    Op::Add,
                                    IndicatorsMode::PhysicalPlan,
                                    "is_linklocal",
                                    "Ipaddr parse err",
                                    1,
                                );
                                return HandResult::Err(());
                            }
                            match elm_value {
                                IpAddr::V4(ip) => {
                                    if ip.is_link_local() == false {
                                        return HandResult::Ok(RealValue::Boolean(false));
                                    }
                                }
                                IpAddr::V6(ip) => {
                                    if ip.is_unicast_link_local() == false {
                                        return HandResult::Ok(RealValue::Boolean(false));
                                    }
                                }
                            }
                        }
                        _ => {
                            indicators_key_ref(
                                Op::Add,
                                IndicatorsMode::PhysicalPlan,
                                "is_linklocal",
                                "left is illegal err",
                                1,
                            );
                            return HandResult::Err(());
                        }
                    }
                } else {
                    return HandResult::Ok(RealValue::Boolean(false));
                }
            },
            PhysicalPlanExprArg::ConfValue(_left) => {
                indicators_key_ref(
                    Op::Add,
                    IndicatorsMode::PhysicalPlan,
                    "is_linklocal",
                    "elements are ConfValue err",
                    1,
                );
                return HandResult::Err(());
            }
            PhysicalPlanExprArg::RealValue(_left) => {
                indicators_key_ref(
                    Op::Add,
                    IndicatorsMode::PhysicalPlan,
                    "is_linklocal",
                    "elements are RealValue err",
                    1,
                );
                return HandResult::Err(());
            }
        }
    }
    return HandResult::Ok(RealValue::Boolean(true));
}

pub fn is_datetime(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    if args.len() != 3 {
        indicators_key_ref(
            Op::Add,
            IndicatorsMode::PhysicalPlan,
            "is_datetime",
            "arg len err",
            1,
        );
        return HandResult::Err(());
    }

    let elm = &args[0];
    let low = &args[1];
    let high = &args[2];

    let elm_value: u64;

    match &elm.deref() {
        PhysicalPlanExprArg::ElementValue(left) => {
            if let Ok(a) = &data.data.get_key_value(left) {
                match a {
                    RealValue::U64(e) => {
                        elm_value = *e;
                    }
                    _ => {
                        indicators_key_ref(
                            Op::Add,
                            IndicatorsMode::PhysicalPlan,
                            "is_datetime",
                            "left is illegal err",
                            1,
                        );
                        return HandResult::Err(());
                    }
                }
            } else {
                return HandResult::Ok(RealValue::Boolean(false));
            }
        },
        PhysicalPlanExprArg::ConfValue(_left) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "is_datetime",
                "elements are ConfValue err",
                1,
            );
            return HandResult::Err(());
        }
        PhysicalPlanExprArg::RealValue(_left) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "is_datetime",
                "elements are RealValue err",
                1,
            );
            return HandResult::Err(());
        }
    }

    let ns = (elm_value % 1000 * 1000 * 1000) as u32;
    let sec = (elm_value / 1000) as i64;

    let date_time_value = DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp(sec, ns), Utc)
        .with_timezone(&FixedOffset::east(8 * 3600));

    let tmp_date: &str;
    match low.deref() {
        PhysicalPlanExprArg::ConfValue(right) => {
            tmp_date = &right.value;
        }
        PhysicalPlanExprArg::RealValue(right) => match right {
            RealValue::Utf8(a) => {
                tmp_date = a;
            }
            _ => {
                indicators_key_ref(
                    Op::Add,
                    IndicatorsMode::PhysicalPlan,
                    "is_datetime",
                    "right is RealValue err",
                    1,
                );
                return HandResult::Err(());
            }
        },
        PhysicalPlanExprArg::ElementValue(_) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "is_datetime",
                "right is ElementValue err",
                1,
            );
            return HandResult::Err(());
        }
    }

    let date1;
    if let Ok(a) = DateTime::parse_from_rfc3339(&tmp_date.replace(".", "-").replace("Z", "+")) {
        date1 = a;
    } else {
        indicators_key_ref(
            Op::Add,
            IndicatorsMode::PhysicalPlan,
            "is_datetime",
            "parse err",
            1,
        );
        return HandResult::Err(());
    }

    let tmp_date: &str;
    match high.deref() {
        PhysicalPlanExprArg::ConfValue(right) => {
            tmp_date = &right.value;
        }
        PhysicalPlanExprArg::RealValue(right) => match right {
            RealValue::Utf8(a) => {
                tmp_date = a;
            }
            _ => {
                indicators_key_ref(
                    Op::Add,
                    IndicatorsMode::PhysicalPlan,
                    "is_datetime",
                    "right is RealValue err",
                    1,
                );
                return HandResult::Err(());
            }
        },
        PhysicalPlanExprArg::ElementValue(_) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "is_datetime",
                "right is ElementValue err",
                1,
            );
            return HandResult::Err(());
        }
    }

    let date2;
    if let Ok(a) = DateTime::parse_from_rfc3339(&tmp_date.replace(".", "-").replace("Z", "+")) {
        date2 = a;
    } else {
        indicators_key_ref(
            Op::Add,
            IndicatorsMode::PhysicalPlan,
            "is_datetime",
            "parse err",
            1,
        );
        return HandResult::Err(());
    }

    match date_time_value.cmp(&date1) {
        Ordering::Less => {
            return HandResult::Ok(RealValue::Boolean(false));
        }
        _ => match date_time_value.cmp(&date2) {
            Ordering::Less => {
                return HandResult::Ok(RealValue::Boolean(true));
            }
            Ordering::Equal => {
                return HandResult::Ok(RealValue::Boolean(true));
            }
            Ordering::Greater => {
                return HandResult::Ok(RealValue::Boolean(false));
            }
        },
    }
}

macro_rules! time_check {
    // macth like arm for macro
    ($args:expr,$data:expr,$checktype:ident,$check:ident,$checkname:expr) => {
        // macro expand to this code
        {
            if $args.len() > 3 {
                indicators_key_ref(
                    Op::Add,
                    IndicatorsMode::PhysicalPlan,
                    $checkname,
                    "arg len err",
                    1,
                );
                return HandResult::Err(());
            }

            let elm = &$args[0];
            let time1 = &$args[1];

            let elm_value: u64;

            match &elm.deref() {
                PhysicalPlanExprArg::ElementValue(left) => {
                    match $data.data.get_key_value(left)? {
                        RealValue::U64(a) => {
                            elm_value = a;
                        }
                        _ => {
                            indicators_key_ref(
                                Op::Add,
                                IndicatorsMode::PhysicalPlan,
                                $checkname,
                                "left is illegal err",
                                1,
                            );
                            return HandResult::Err(());
                        }
                    }
                },
                PhysicalPlanExprArg::ConfValue(_left) => {
                    indicators_key_ref(
                        Op::Add,
                        IndicatorsMode::PhysicalPlan,
                        $checkname,
                        "elements are ConfValue err",
                        1,
                    );
                    return HandResult::Err(());
                }
                PhysicalPlanExprArg::RealValue(_left) => {
                    indicators_key_ref(
                        Op::Add,
                        IndicatorsMode::PhysicalPlan,
                        $checkname,
                        "elements are RealValue err",
                        1,
                    );
                    return HandResult::Err(());
                }
            }

            let ns = (elm_value % 1000 * 1000 * 1000) as u32;
            let sec = (elm_value / 1000) as i64;

            let date_time_value =
                DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp(sec, ns), Utc)
                    .with_timezone(&FixedOffset::east(8 * 3600));

            let tmp_time1: $checktype;
            let tmp_time2: $checktype;

            match time1.deref() {
                PhysicalPlanExprArg::ConfValue(right) => {
                    if let Ok(a) = right.value.parse() {
                        tmp_time1 = a;
                    } else {
                        indicators_key_ref(
                            Op::Add,
                            IndicatorsMode::PhysicalPlan,
                            $checkname,
                            "parse err",
                            1,
                        );
                        return HandResult::Err(());
                    }
                }
                PhysicalPlanExprArg::RealValue(right) => match right {
                    RealValue::Utf8(a) => {
                        if let Ok(a) = a.parse() {
                            tmp_time1 = a;
                        } else {
                            indicators_key_ref(
                                Op::Add,
                                IndicatorsMode::PhysicalPlan,
                                $checkname,
                                "parse err",
                                1,
                            );
                            return HandResult::Err(());
                        }
                    }
                    _ => {
                        indicators_key_ref(
                            Op::Add,
                            IndicatorsMode::PhysicalPlan,
                            $checkname,
                            "right is RealValue err",
                            1,
                        );
                        return HandResult::Err(());
                    }
                },
                PhysicalPlanExprArg::ElementValue(_) => {
                    indicators_key_ref(
                        Op::Add,
                        IndicatorsMode::PhysicalPlan,
                        $checkname,
                        "right is ElementValue err",
                        1,
                    );
                    return HandResult::Err(());
                }
            }

            if $args.len() == 2 {
                if tmp_time1 == date_time_value.$check() {
                    return HandResult::Ok(RealValue::Boolean(true));
                }

                return HandResult::Ok(RealValue::Boolean(false));
            } else if $args.len() == 3 {
                let time2 = &$args[2];

                match time2.deref() {
                    PhysicalPlanExprArg::ConfValue(right1) => {
                        if let Ok(a) = right1.value.parse() {
                            tmp_time2 = a;
                        } else {
                            indicators_key_ref(
                                Op::Add,
                                IndicatorsMode::PhysicalPlan,
                                $checkname,
                                "parse err",
                                1,
                            );
                            return HandResult::Err(());
                        }
                        if date_time_value.$check() >= tmp_time1
                            && date_time_value.$check() <= tmp_time2
                        {
                            return HandResult::Ok(RealValue::Boolean(true));
                        } else {
                            return HandResult::Ok(RealValue::Boolean(false));
                        }
                    }
                    PhysicalPlanExprArg::RealValue(right) => match right {
                        RealValue::Utf8(a) => {
                            if let Ok(a) = a.parse() {
                                tmp_time2 = a;
                            } else {
                                indicators_key_ref(
                                    Op::Add,
                                    IndicatorsMode::PhysicalPlan,
                                    $checkname,
                                    "parse err",
                                    1,
                                );
                                return HandResult::Err(());
                            }
                            if date_time_value.$check() >= tmp_time1
                                && date_time_value.$check() <= tmp_time2
                            {
                                return HandResult::Ok(RealValue::Boolean(true));
                            } else {
                                return HandResult::Ok(RealValue::Boolean(false));
                            }
                        }
                        _ => {
                            indicators_key_ref(
                                Op::Add,
                                IndicatorsMode::PhysicalPlan,
                                $checkname,
                                "right is RealValue err",
                                1,
                            );
                            return HandResult::Err(());
                        }
                    },
                    PhysicalPlanExprArg::ElementValue(_) => {
                        indicators_key_ref(
                            Op::Add,
                            IndicatorsMode::PhysicalPlan,
                            $checkname,
                            "right is ElementValue err",
                            1,
                        );
                        return HandResult::Err(());
                    }
                }
            } else {
                indicators_key_ref(
                    Op::Add,
                    IndicatorsMode::PhysicalPlan,
                    $checkname,
                    "arg len err",
                    1,
                );
                return HandResult::Err(());
            }
        }
    };
}

pub fn is_year(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    time_check!(args, data, i32, year, "is_year")
}

pub fn is_month(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    time_check!(args, data, u32, month, "is_month")
}

pub fn is_day(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    time_check!(args, data, u32, day, "is_day")
}

pub fn is_hour(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    time_check!(args, data, u32, hour, "is_hour")
}

pub fn is_minute(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    time_check!(args, data, u32, minute, "is_minute")
}

pub fn is_second(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    time_check!(args, data, u32, second, "is_second")
}

pub fn is_weekday(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    if args.len() > 3 {
        indicators_key_ref(
            Op::Add,
            IndicatorsMode::PhysicalPlan,
            "is_weekday",
            "arg len err",
            1,
        );
        return HandResult::Err(());
    }

    let elm = &args[0];
    let time1 = &args[1];

    let elm_value: u64;
    //1. 鍏堣В鏋恊lement鍊?
    match &elm.deref() {
        PhysicalPlanExprArg::ElementValue(left) => {
            if let Ok(a) = &data.data.get_key_value(left) {
                match a {
                    RealValue::U64(a) => {
                        elm_value = *a;
                    }
                    _ => {
                        indicators_key_ref(
                            Op::Add,
                            IndicatorsMode::PhysicalPlan,
                            "is_weekday",
                            "left is illegal err",
                            1,
                        );
                        return HandResult::Err(());
                    }
                }
            } else {
                return HandResult::Ok(RealValue::Boolean(false));
            }
        },
        PhysicalPlanExprArg::ConfValue(_left) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "is_weekday",
                "elements are ConfValue err",
                1,
            );
            return HandResult::Err(());
        }
        PhysicalPlanExprArg::RealValue(_left) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "is_weekday",
                "elements are RealValue err",
                1,
            );
            return HandResult::Err(());
        }
    }

    let ns = (elm_value % 1000 * 1000 * 1000) as u32;
    let sec = (elm_value / 1000) as i64;

    let date_time_value = DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp(sec, ns), Utc)
        .with_timezone(&FixedOffset::east(8 * 3600));

    // //2. 瑙ｆ瀽regex
    let low: u32;
    let high: u32;

    match time1.deref() {
        PhysicalPlanExprArg::ConfValue(right) => {
            if let Ok(a) = right.value.parse() {
                low = a;
            } else {
                indicators_key_ref(
                    Op::Add,
                    IndicatorsMode::PhysicalPlan,
                    "is_weekday",
                    "parse err",
                    1,
                );
                return HandResult::Err(());
            }
        }
        PhysicalPlanExprArg::RealValue(right) => match right {
            RealValue::Utf8(a) => {
                if let Ok(a) = a.parse() {
                    low = a;
                } else {
                    indicators_key_ref(
                        Op::Add,
                        IndicatorsMode::PhysicalPlan,
                        "is_weekday",
                        "parse err",
                        1,
                    );
                    return HandResult::Err(());
                }
            }
            _ => {
                indicators_key_ref(
                    Op::Add,
                    IndicatorsMode::PhysicalPlan,
                    "is_weekday",
                    "right is RealValue err",
                    1,
                );
                return HandResult::Err(());
            }
        },
        PhysicalPlanExprArg::ElementValue(_) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "is_weekday",
                "right is ElementValue err",
                1,
            );
            return HandResult::Err(());
        }
    }

    if args.len() == 2 {
        if low == date_time_value.weekday().number_from_monday() {
            return HandResult::Ok(RealValue::Boolean(true));
        }

        return HandResult::Ok(RealValue::Boolean(false));
    } else if args.len() == 3 {
        let time2 = &args[2];

        match time2.deref() {
            PhysicalPlanExprArg::ConfValue(right1) => {
                if let Ok(a) = right1.value.parse() {
                    high = a;
                } else {
                    indicators_key_ref(
                        Op::Add,
                        IndicatorsMode::PhysicalPlan,
                        "is_weekday",
                        "parse err",
                        1,
                    );
                    return HandResult::Err(());
                }

                if date_time_value.weekday().number_from_monday() >= low
                    && date_time_value.weekday().number_from_monday() <= high
                {
                    return HandResult::Ok(RealValue::Boolean(true));
                } else {
                    return HandResult::Ok(RealValue::Boolean(false));
                }
            }
            PhysicalPlanExprArg::RealValue(right) => match right {
                RealValue::Utf8(a) => {
                    if let Ok(a) = a.parse() {
                        high = a;
                    } else {
                        indicators_key_ref(
                            Op::Add,
                            IndicatorsMode::PhysicalPlan,
                            "is_weekday",
                            "parse err",
                            1,
                        );
                        return HandResult::Err(());
                    }
                    if date_time_value.weekday().number_from_monday() >= low
                        && date_time_value.weekday().number_from_monday() <= high
                    {
                        return HandResult::Ok(RealValue::Boolean(true));
                    } else {
                        return HandResult::Ok(RealValue::Boolean(false));
                    }
                }
                _ => {
                    indicators_key_ref(
                        Op::Add,
                        IndicatorsMode::PhysicalPlan,
                        "is_weekday",
                        "right is RealValue err",
                        1,
                    );
                    return HandResult::Err(());
                }
            },
            PhysicalPlanExprArg::ElementValue(_) => {
                indicators_key_ref(
                    Op::Add,
                    IndicatorsMode::PhysicalPlan,
                    "is_weekday",
                    "right is ElementValue err",
                    1,
                );
                return HandResult::Err(());
            }
        }
    } else {
        indicators_key_ref(
            Op::Add,
            IndicatorsMode::PhysicalPlan,
            "is_weekday",
            "arg len err",
            1,
        );
        return HandResult::Err(());
    }
}

pub fn is_duration(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    if args.len() > 3 {
        indicators_key_ref(
            Op::Add,
            IndicatorsMode::PhysicalPlan,
            "is_duration",
            "arg len err",
            1,
        );
        return HandResult::Err(());
    }

    let elm = &args[0];
    let time1 = &args[1];

    let elm_value: u64;

    match &elm.deref() {
        PhysicalPlanExprArg::ElementValue(left) => {
            if let Ok(a) = &data.data.get_key_value(left) {
                match a {
                    RealValue::U64(a) => {
                        elm_value = *a;
                    }
                    _ => {
                        indicators_key_ref(
                            Op::Add,
                            IndicatorsMode::PhysicalPlan,
                            "is_duration",
                            "left is illegal err",
                            1,
                        );
                        return HandResult::Err(());
                    }
                }
            } else {
                return HandResult::Ok(RealValue::Boolean(false));
            }
        },
        PhysicalPlanExprArg::ConfValue(_left) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "is_duration",
                "elements are ConfValue err",
                1,
            );
            return HandResult::Err(());
        }
        PhysicalPlanExprArg::RealValue(_left) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "is_duration",
                "elements are RealValue err",
                1,
            );
            return HandResult::Err(());
        }
    }

    let tmp_year: u64;
    let tmp_year2: u64;

    match time1.deref() {
        PhysicalPlanExprArg::ConfValue(right) => {
            if let Ok(a) = right.value.parse() {
                tmp_year = a;
            } else {
                indicators_key_ref(
                    Op::Add,
                    IndicatorsMode::PhysicalPlan,
                    "is_duration",
                    "parse err",
                    1,
                );
                return HandResult::Err(());
            }
        }
        PhysicalPlanExprArg::RealValue(right) => match right {
            RealValue::Utf8(a) => {
                if let Ok(a) = a.parse() {
                    tmp_year = a;
                } else {
                    indicators_key_ref(
                        Op::Add,
                        IndicatorsMode::PhysicalPlan,
                        "is_duration",
                        "parse err",
                        1,
                    );
                    return HandResult::Err(());
                }
            }
            RealValue::Duration(a) => {
                tmp_year = a.num_milliseconds() as u64;
            }
            _ => {
                indicators_key_ref(
                    Op::Add,
                    IndicatorsMode::PhysicalPlan,
                    "is_duration",
                    "right is RealValue err",
                    1,
                );
                return HandResult::Err(());
            }
        },
        PhysicalPlanExprArg::ElementValue(_) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "is_duration",
                "right is ElementValue err",
                1,
            );
            return HandResult::Err(());
        }
    }

    if args.len() == 2 {
        if tmp_year == elm_value {
            return HandResult::Ok(RealValue::Boolean(true));
        }

        return HandResult::Ok(RealValue::Boolean(false));
    } else if args.len() == 3 {
        let time2 = &args[2];

        match time2.deref() {
            PhysicalPlanExprArg::ConfValue(right1) => {
                if let Ok(a) = right1.value.parse() {
                    tmp_year2 = a;
                } else {
                    indicators_key_ref(
                        Op::Add,
                        IndicatorsMode::PhysicalPlan,
                        "is_duration",
                        "parse err",
                        1,
                    );
                    return HandResult::Err(());
                }
                if elm_value >= tmp_year && elm_value <= tmp_year2 {
                    return HandResult::Ok(RealValue::Boolean(true));
                } else {
                    return HandResult::Ok(RealValue::Boolean(false));
                }
            }
            PhysicalPlanExprArg::RealValue(right) => match right {
                RealValue::Utf8(a) => {
                    if let Ok(a) = a.parse() {
                        tmp_year2 = a;
                    } else {
                        indicators_key_ref(
                            Op::Add,
                            IndicatorsMode::PhysicalPlan,
                            "is_duration",
                            "parse err",
                            1,
                        );
                        return HandResult::Err(());
                    }
                    if elm_value >= tmp_year && elm_value <= tmp_year2 {
                        return HandResult::Ok(RealValue::Boolean(true));
                    } else {
                        return HandResult::Ok(RealValue::Boolean(false));
                    }
                }
                RealValue::Duration(a) => {
                    tmp_year2 = a.num_milliseconds() as u64;
                    if elm_value >= tmp_year && elm_value <= tmp_year2 {
                        return HandResult::Ok(RealValue::Boolean(true));
                    } else {
                        return HandResult::Ok(RealValue::Boolean(false));
                    }
                }
                _ => {
                    indicators_key_ref(
                        Op::Add,
                        IndicatorsMode::PhysicalPlan,
                        "is_duration",
                        "right is RealValue err",
                        1,
                    );
                    return HandResult::Err(());
                }
            },
            PhysicalPlanExprArg::ElementValue(_) => {
                indicators_key_ref(
                    Op::Add,
                    IndicatorsMode::PhysicalPlan,
                    "is_duration",
                    "right is ElementValue err",
                    1,
                );
                return HandResult::Err(());
            }
        }
    } else {
        indicators_key_ref(
            Op::Add,
            IndicatorsMode::PhysicalPlan,
            "is_duration",
            "arg len err",
            1,
        );
        return HandResult::Err(());
    }
}

// eg : hmis_hmss((ts),utf8_aa:02:03,utf8_23:01:23), range from hour + minute + second
pub fn is_hms(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    if args.len() > 3 || args.len() < 2 {
        indicators_key_ref(
            Op::Add,
            IndicatorsMode::PhysicalPlan,
            "is_hms",
            "arg len err",
            1,
        );
        return HandResult::Err(());
    }

    let elm = &args[0];
    let low = &args[1];

    let elm_value: u64;

    match &elm.deref() {
        PhysicalPlanExprArg::ElementValue(left) => {
            match &data.get_key_value_always(left) {
                RealValue::U64(s) => {
                    elm_value = *s;
                }
                _ => {
                    indicators_key_ref(
                        Op::Add,
                        IndicatorsMode::PhysicalPlan,
                        "is_hms",
                        "left is illegal err",
                        1,
                    );
                    return HandResult::Err(());
                }
            }
        },
        PhysicalPlanExprArg::ConfValue(_left) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "is_hms",
                "elements are ConfValue err",
                1,
            );
            return HandResult::Err(());
        }
        PhysicalPlanExprArg::RealValue(_left) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "is_hms",
                "elements are RealValue err",
                1,
            );
            return HandResult::Err(());
        }
    }

    let ns = (elm_value % 1000 * 1000 * 1000) as u32;
    let sec = (elm_value / 1000) as i64;

    let date_time_value = DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp(sec, ns), Utc)
        .with_timezone(&FixedOffset::east(8 * 3600));

    // //2. 瑙ｆ瀽regex
    let mut low_value: u32;
    let mut high_value: u32;
    let tmp_value: u32 =
        date_time_value.hour() * 10000 + date_time_value.minute() * 100 + date_time_value.second();
    match low.deref() {
        PhysicalPlanExprArg::ConfValue(right) => {
            let a = &right.value;

            if let Ok(a) = a[..2].parse::<u32>() {
                low_value = a * 10000;
            } else {
                indicators_key_ref(Op::Add, IndicatorsMode::PhysicalPlan, "is_hms", "parse err", 1);
                return HandResult::Err(());
            }

            if let Ok(a) = a[3..5].parse::<u32>() {
                low_value = low_value + a * 100;
            } else {
                indicators_key_ref(Op::Add, IndicatorsMode::PhysicalPlan, "is_hms", "parse err", 1);
                return HandResult::Err(());
            }

            if let Ok(a) = a[6..8].parse::<u32>() {
                low_value = low_value + a;
            } else {
                indicators_key_ref(Op::Add, IndicatorsMode::PhysicalPlan, "is_hms", "parse err", 1);
                return HandResult::Err(());
            }
        }
        PhysicalPlanExprArg::RealValue(right) => match right {
            RealValue::Utf8(a) => {
                if let Ok(a) = a[..2].parse::<u32>() {
                    low_value = a * 10000;
                } else {
                    indicators_key_ref(
                        Op::Add,
                        IndicatorsMode::PhysicalPlan,
                        "is_hms",
                        "parse err",
                        1,
                    );
                    return HandResult::Err(());
                }

                if let Ok(a) = a[3..5].parse::<u32>() {
                    low_value = low_value + a * 100;
                } else {
                    indicators_key_ref(
                        Op::Add,
                        IndicatorsMode::PhysicalPlan,
                        "is_hms",
                        "parse err",
                        1,
                    );
                    return HandResult::Err(());
                }

                if let Ok(a) = a[6..8].parse::<u32>() {
                    low_value = low_value + a;
                } else {
                    indicators_key_ref(
                        Op::Add,
                        IndicatorsMode::PhysicalPlan,
                        "is_hms",
                        "parse err",
                        1,
                    );
                    return HandResult::Err(());
                }
            }
            _ => {
                indicators_key_ref(
                    Op::Add,
                    IndicatorsMode::PhysicalPlan,
                    "is_hms",
                    "right is RealValue err",
                    1,
                );
                return HandResult::Err(());
            }
        },
        PhysicalPlanExprArg::ElementValue(_) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "is_hms",
                "right is ElementValue err",
                1,
            );
            return HandResult::Err(());
        }
    }

    if args.len() == 2 {
        if low_value == tmp_value {
            return HandResult::Ok(RealValue::Boolean(true));
        }

        return HandResult::Ok(RealValue::Boolean(false));
    } else if args.len() == 3 {
        let high = &args[2];

        match high.deref() {
            PhysicalPlanExprArg::ConfValue(right1) => {
                let a = &right1.value;
                if let Ok(a) = a[..2].parse::<u32>() {
                    high_value = a * 10000;
                } else {
                    indicators_key_ref(
                        Op::Add,
                        IndicatorsMode::PhysicalPlan,
                        "is_hms",
                        "parse err",
                        1,
                    );
                    return HandResult::Err(());
                }

                if let Ok(a) = a[3..5].parse::<u32>() {
                    high_value = high_value + a * 100;
                } else {
                    indicators_key_ref(
                        Op::Add,
                        IndicatorsMode::PhysicalPlan,
                        "is_hms",
                        "parse err",
                        1,
                    );
                    return HandResult::Err(());
                }

                if let Ok(a) = a[6..8].parse::<u32>() {
                    high_value = high_value + a;
                } else {
                    indicators_key_ref(
                        Op::Add,
                        IndicatorsMode::PhysicalPlan,
                        "is_hms",
                        "parse err",
                        1,
                    );
                    return HandResult::Err(());
                }
            }
            PhysicalPlanExprArg::RealValue(right) => match right {
                RealValue::Utf8(a) => {
                    if let Ok(a) = a[..2].parse::<u32>() {
                        high_value = a * 10000;
                    } else {
                        indicators_key_ref(
                            Op::Add,
                            IndicatorsMode::PhysicalPlan,
                            "is_hms",
                            "parse err",
                            1,
                        );
                        return HandResult::Err(());
                    }

                    if let Ok(a) = a[3..5].parse::<u32>() {
                        high_value = high_value + a * 100;
                    } else {
                        indicators_key_ref(
                            Op::Add,
                            IndicatorsMode::PhysicalPlan,
                            "is_hms",
                            "parse err",
                            1,
                        );
                        return HandResult::Err(());
                    }

                    if let Ok(a) = a[6..8].parse::<u32>() {
                        high_value = high_value + a;
                    } else {
                        indicators_key_ref(
                            Op::Add,
                            IndicatorsMode::PhysicalPlan,
                            "is_hms",
                            "parse err",
                            1,
                        );
                        return HandResult::Err(());
                    }
                }
                _ => {
                    indicators_key_ref(
                        Op::Add,
                        IndicatorsMode::PhysicalPlan,
                        "is_hms",
                        "right is RealValue err",
                        1,
                    );
                    return HandResult::Err(());
                }
            },
            PhysicalPlanExprArg::ElementValue(_) => {
                indicators_key_ref(
                    Op::Add,
                    IndicatorsMode::PhysicalPlan,
                    "is_hms",
                    "right is ElementValue err",
                    1,
                );
                return HandResult::Err(());
            }
        }

        if tmp_value >= low_value && tmp_value <= high_value {
            return HandResult::Ok(RealValue::Boolean(true));
        } else {
            return HandResult::Ok(RealValue::Boolean(false));
        }
    } else {
        indicators_key_ref(
            Op::Add,
            IndicatorsMode::PhysicalPlan,
            "is_hms",
            "arg len err",
            1,
        );
        return HandResult::Err(());
    }
}

pub fn is_random(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    if args.len() != 1 {
        indicators_key_ref(
            Op::Add,
            IndicatorsMode::PhysicalPlan,
            "is_random",
            "arg len err",
            1,
        );
        return HandResult::Err(());
    }

    let rand_value = &args[0];
    let mut rng = thread_rng();
    let rand: u32 = rng.gen_range(0..99);

    match &rand_value.deref() {
        PhysicalPlanExprArg::ElementValue(_) => {
            indicators_key_ref(
                Op::Add,
                IndicatorsMode::PhysicalPlan,
                "is_random",
                "elements are ElementValue err",
                1,
            );
            return HandResult::Err(());
        }
        PhysicalPlanExprArg::ConfValue(left) => {
            if let Ok(a) = left.value.parse::<u32>() {
                if a > rand {
                    return HandResult::Ok(RealValue::Boolean(true));
                } else {
                    return HandResult::Ok(RealValue::Boolean(false));
                }
            } else {
                indicators_key_ref(
                    Op::Add,
                    IndicatorsMode::PhysicalPlan,
                    "is_random",
                    "parse err",
                    1,
                );
                return HandResult::Err(());
            }
        }
        PhysicalPlanExprArg::RealValue(left) => match left {
            RealValue::U32(a) => {
                if *a > rand {
                    return HandResult::Ok(RealValue::Boolean(true));
                } else {
                    return HandResult::Ok(RealValue::Boolean(false));
                }
            }
            RealValue::U16(a) => {
                if (*a) as u32 > rand {
                    return HandResult::Ok(RealValue::Boolean(true));
                } else {
                    return HandResult::Ok(RealValue::Boolean(false));
                }
            }
            RealValue::U8(a) => {
                if (*a) as u32 > rand {
                    return HandResult::Ok(RealValue::Boolean(true));
                } else {
                    return HandResult::Ok(RealValue::Boolean(false));
                }
            }
            RealValue::U64(a) => {
                if (*a) as u32 > rand {
                    return HandResult::Ok(RealValue::Boolean(true));
                } else {
                    return HandResult::Ok(RealValue::Boolean(false));
                }
            }
            RealValue::I32(a) => {
                if (*a) as u32 > rand {
                    return HandResult::Ok(RealValue::Boolean(true));
                } else {
                    return HandResult::Ok(RealValue::Boolean(false));
                }
            }
            RealValue::I16(a) => {
                if (*a) as u32 > rand {
                    return HandResult::Ok(RealValue::Boolean(true));
                } else {
                    return HandResult::Ok(RealValue::Boolean(false));
                }
            }
            RealValue::I8(a) => {
                if (*a) as u32 > rand {
                    return HandResult::Ok(RealValue::Boolean(true));
                } else {
                    return HandResult::Ok(RealValue::Boolean(false));
                }
            }
            RealValue::I64(a) => {
                if (*a) as u32 > rand {
                    return HandResult::Ok(RealValue::Boolean(true));
                } else {
                    return HandResult::Ok(RealValue::Boolean(false));
                }
            }
            _ => {
                indicators_key_ref(
                    Op::Add,
                    IndicatorsMode::PhysicalPlan,
                    "is_random",
                    "RealValue err",
                    1,
                );
                return HandResult::Err(());
            }
        },
    }
}
